package com.hsogoo.message.impl.robbitMq;

import java.util.Date;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import com.hsogoo.message.MqSender;
import com.hsogoo.message.bean.MessageBean;
import com.hsogoo.message.dao.MessageDao;
import com.hsogoo.message.enums.MessageStatus;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback, MqSender, InitializingBean {

    private RabbitTemplate txRabbitTemplate;
    @Autowired
    private AmqpTemplate rabbitTemplate;
    @Autowired
    private ConnectionFactory connectionFactory;
    @Autowired
    private MessageDao messageDao;

    @Override
    public void afterPropertiesSet() throws Exception {
        txRabbitTemplate = new RabbitTemplate(connectionFactory);
        txRabbitTemplate.setConfirmCallback(this);
    }

    public void sendTxMessage(final String exchange, final String routingKey, final String message) {

        MessageBean messageBean = new MessageBean.Builder()
                                        .exchange(exchange)
                                        .routingKey(routingKey)
                                        .message(message)
                                        .status(MessageStatus.INIT.name())
                                        .createTime(new Date())
                                        .updateTime(new Date())
                                        .build();
        Long saveId = messageDao.saveMessage(messageBean);
        if(TransactionSynchronizationManager.isSynchronizationActive()){
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                @Override
                public void afterCommit() {
                try {
                    txRabbitTemplate.convertAndSend(exchange, routingKey, message, new CorrelationData(String.valueOf(messageBean.getId())));
                    log.info("amqpTemplate.convertAndSend,, exchange=[{}], exchange=[{}], message=[{}]", exchange, routingKey, message);
                } catch (Exception e) {
                    log.error("error in convertAndSend, exchange=[{}], exchange=[{}], message=[{}]", exchange, routingKey, message , e);
                }
                }
            });
        }else{
            txRabbitTemplate.convertAndSend(exchange, routingKey, message, new CorrelationData(String.valueOf(messageBean.getId())));
            log.info("amqpTemplate.convertAndSend, exchange=[{}], exchange=[{}], message=[{}]", exchange, routingKey, message);
        }
    }

    public void sendMessage(String exchange, String routingKey, String message) {
        try{
            rabbitTemplate.convertAndSend(exchange, routingKey, message);
            log.info("rabbitTemplate.convertAndSend, exchange=[{}], exchange=[{}], message=[{}]", exchange, routingKey, message);
        }catch (Exception e){
            log.error("failed to send mq message", e);
        }
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("message confirm result:{},\t{},\t{}", correlationData, ack, cause);
        if (ack) {
            if (correlationData != null) {
                MessageBean messageBean = new MessageBean.Builder()
                                            .id(Long.valueOf(correlationData.getId()))
                                            .status(MessageStatus.FINISH.name()).build();
                messageDao.updateMessageStatus(messageBean);
            }
        } else {
            log.error("message unReceivable ,cause:{}", cause);
        }
    }

}
